Conversation
56cc73b to
3372321
Compare
… Arrow on JDK9+ ### What changes were proposed in this pull request? This PR aims to add `io.netty.tryReflectionSetAccessible=true` to the testing configuration for JDK11 because this is an officially documented requirement of Apache Arrow. Apache Arrow community documented this requirement at `0.15.0` ([ARROW-6206](apache/arrow#5078)). > #### For java 9 or later, should set "-Dio.netty.tryReflectionSetAccessible=true". > This fixes `java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available`. thrown by netty. ### Why are the changes needed? After ARROW-3191, Arrow Java library requires the property `io.netty.tryReflectionSetAccessible` to be set to true for JDK >= 9. After apache#26133, JDK11 Jenkins job seem to fail. - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-3.2-jdk-11/676/ - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-3.2-jdk-11/677/ - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-3.2-jdk-11/678/ ```scala Previous exception in task: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:473) io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243) io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233) io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245) org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(ArrowRecordBatch.java:222) ``` ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Pass the Jenkins with JDK11. Closes apache#26552 from dongjoon-hyun/SPARK-ARROW-JDK11. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
viirya
left a comment
There was a problem hiding this comment.
So basically how to push down nested column predicates are implemented by each v2 data source. The main change here looks like:
- Add v2 Filter and subclasses.
- translate catalyst predicate to v2 Filter
- Replace v1 Filter with v2 Filter in Orc filter helper.
| import org.apache.spark.sql.connector.expressions.NamedReference; | ||
|
|
||
| @Experimental | ||
| public abstract class FilterV2 { |
There was a problem hiding this comment.
The package is already v2. Do we need add v2?
| */ | ||
| public abstract NamedReference[] references(); | ||
|
|
||
| protected NamedReference[] findReferences(Object valve) { |
| /** | ||
| * Methods that can be shared when upgrading the built-in Hive. | ||
| */ | ||
| trait OrcFiltersBase { |
| /** | ||
| * The base class file format that is based on text file. | ||
| */ | ||
| abstract class TextBasedFileFormat extends FileFormatV2 { |
There was a problem hiding this comment.
I seems not see TextBasedFileFormat and FileFormatV2 are used?
| * @throws IllegalArgumentException If the delete is rejected due to required effort | ||
| */ | ||
| void deleteWhere(Filter[] filters); | ||
| void deleteWhere(FilterV2[] filters); |
There was a problem hiding this comment.
I think that a good way to switch between v1 filters and v2 filters is to add both methods and convert from v2 to v1 in a default implementation of the v2 version. That's an easy way for people to update to the new filter API.
| import org.apache.spark.sql.connector.expressions.NamedReference; | ||
|
|
||
| @Experimental | ||
| public abstract class FilterV2 { |
There was a problem hiding this comment.
Do we want to use Filter or should we use Predicate for expressions that evaluate to a boolean?
| * @since 3.0.0 | ||
| */ | ||
| @Experimental | ||
| case class EqualTo(ref: NamedReference, value: Any) extends FilterV2 { |
There was a problem hiding this comment.
Can FilterV2 extend the v2 Expression base?
| */ | ||
| @Experimental | ||
| case class EqualTo(ref: NamedReference, value: Any) extends FilterV2 { | ||
| override def references: Array[NamedReference] = Array(ref) ++ findReferences(value) |
There was a problem hiding this comment.
Why is value Any? Shouldn't it be an expression (like the v2 Literal)?
Also, in Iceberg expressions we've updated ref to be a Term instead of a Reference. Both Reference and Transform are terms, which allows us to express that the value of a transformed reference is equal to something. That gives us the ability to express date(ts) = '2020-01-17', for example.
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2 |
There was a problem hiding this comment.
I think this should be in the connector.expressions package.
| /** | ||
| * Used to read and write data stored in files to/from the [[InternalRow]] format. | ||
| */ | ||
| trait FileFormatV2 { |
There was a problem hiding this comment.
Since the API already supports v1 Filter, I don't think we need to make these changes. We should just continue to support the v1 filters for older sources. That decouples these changes from updates to the file sources.
|
@dbtsai, this looks like a great start to me. I'd really like to see a v2 API for predicates/filters. One thing that's missing is that the v2 API is written as Java interfaces. Spark has its own implementations that are case classes, but we do need the Java interfaces for the new filter expressions defined. I'd also recommend creating extractor functions like the ones we created to work with transforms. Those allow us to seamlessly use the Spark internal class names even if the source has returned a different implementation of the Java interface. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
Why are the changes needed?
Does this PR introduce any user-facing change?
How was this patch tested?